State Machine Decorator Module
Contents
Overview
This module provides a set of decorators that are useful for implementing state machines of the type described by UML 2.0 state charts. The overhead of these decorators may be too high for them to be useful in parsing applications.
The code for the state machine decorator module is given below. Examples are given following the code.
License
Copyright (C) 2010, 2011 Rodney Drenth All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. 3. Neither the name of the project nor the names of the author may be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDER AND CONTRIBUTERS``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Python Code
import types
import itertools
import logging
logging.basicConfig(
filename="gf_info.txt",
format = "%(levelname)-10s %(message)s",
level = logging.ERROR )
from functools import wraps
def truncated(alist, cmprsn):
for x in alist:
if x.__name__ == cmprsn: break
yield x
class ContextBase(object):
pass
class _StateVariable(object):
""" Attribute of a class to maintain state .
State Variable objects are instantiated indirectly via calls to the
TransitionTable class's initialize method. TransitionTable objects are created
at the class level.
"""
def __init__(self, transTable, context):
"""Constructor - set to initial state"""
self.__current_state = transTable.initialstate
self.__next_state = transTable.initialstate
self.sTable = transTable
self.__statestack=[]
self.__ctxClass = context.__class__
def toNextState(self, context):
"""Transition to next state, if a next_state is differnt.
In addition to the actual state transition, it invokes onLeave
and onEnter methods as required.
"""
if self.__next_state is not self.__current_state:
cc = context.__class__
tt_name = self.sTable.inst_state_name
logging.debug("Transitioning to state %s"%self.__next_state.__name__)
def callInState(methName, crnt_state):
if (hasattr(crnt_state, methName) or hasattr(context, methName)):
nmro = [crnt_state,]
nmro.extend(cc.__mro__)
psudoClassName = "%s_%s"%(cc.__name__, crnt_state.__name__)
stCls = type( psudoClassName, tuple(nmro), {})
context.__class__ = stCls
getattr(context, methName)() # call the onEnter or onLeave method here
context.__class__ = cc
callInState('onLeave', self.__current_state)
self.__setState( context )
callInState('onEnter', self.__current_state)
def __setState(self, context ):
"""low level funky called from toNextState"""
cc = context.__class__
mro = cc.__mro__
if ( self.__current_state not in mro):
self.__current_state = self.__next_state
return
logging.debug("Current state %s in mro"% self.__current_state.__name__)
def f(anc):
return self.__next_state if anc == self.__current_state else anc
newmro = tuple(f(anc) for anc in cc.__mro__)
tt_name = self.sTable.inst_state_name
cls_name ="%s_%s"%(self.__ctxClass.__name__, self.__next_state.__name__)
context.__class__ = type(cls_name, newmro, {})
def pushState(self, newState, context = None):
"""PushState - allows going to another state with intent of returning
to the current one."""
self.__statestack.append(self._current_state)
self.__next_state = newState
if context:
self.toNextState(context)
def popState(self, context = None):
"""Pop back to the previously pushed state (pushState)"""
self.__next_state = self.__statestack.pop()
if (context):
self.toNextState( context)
def name(self):
"""Return name of current state"""
return self.__current_state.__name__
def setXition(self, func):
""" Sets the state to transition to upon seeing a transtion event
This method should only be called by the decorators impl'd in this module.
"""
nxState = self.__current_state.nextStates[func.__name__]
if nxState is not None:
self.__next_state = nxState;
def getFunc(self, func, contxt):
"""Gets the state dependant action method, wrapped in a try-catch block.
This method should only be called by the decorators impl'd in this module.
"""
crnt = self.__current_state
svar_name = self.sTable.inst_state_name
svCtxt = self.__ctxClass
cc = contxt.__class__
pseudoclas = "%s_%s"%(cc.__name__, crnt.__name__)
nmro = [crnt]
lhead = itertools.takewhile( lambda x: x != svCtxt, crnt.__mro__)
if svCtxt in cc.__mro__:
ltail = itertools.dropwhile( lambda x: x!= svCtxt, cc.__mro__)
else:
ltail = cc.__mro__
nmro.extend(ltail)
logging.debug("%s - %s - %s - [%s]\n"%(func.__name__, cc.__name__,
svar_name, ", ".join( cls.__name__ for cls in truncated(nmro,'TopLevelWindow' ))))
stCls = type( pseudoclas, tuple(nmro), {})
contxt.__class__ = stCls
try:
funky = getattr(contxt, func.__name__)
except:
funky = None
contxt.__class__ = cc # revert...
if funky is None:
t = "'%s' has no attribute '%s' in state %s" % (self.name(),
func.__name__, crnt.__name__)
raise NotImplementedError(t)
# function with wrapping attribute means we've recursed all the way back
# to the context class and need to call the func as a default.
if hasattr(funky, "wrapping") and (funky.wrapping == self.sTable.inst_state_name):
def funcA(*args, **kwargs):
return func(contxt, *args, **kwargs)
funky = funcA
def wrappd2( self, *args, **kwargs):
# wrap in try - except in event that funky() does something funky
try:
self.__class__ = stCls
retn = funky( *args, **kwargs)
finally:
self.__class__ = cc
return retn
return wrappd2
# -----------------------------------------------------------------------------
class TransitionTable(object):
"""Defines a state table for a state machine class
A state table for a class is associated with the state variable in the instances
of the class. The name of the state variable is given in the constructor to the
StateTable object. StateTable objects are attributes of state machine classes,
not intances of the state machine class. A state machine class can have more
than one StateTable.
"""
def __init__(self, stateVarblName):
"""Transition Table constructor - state varblName is name of associated
instance state variable. """
self.inst_state_name = stateVarblName
self.eventList = []
self.initalState = None
nextStates = {}
def initialize(self, ctxt):
"""Create a new state variable in the context. State variable refs this
transition table."""
ctxt.__dict__[self.inst_state_name] = _StateVariable(self, ctxt)
def _addEventHandler(self, funcName):
"""Notifies the current object of a metho that handles a transition.
This is called by two of the decorators implemented below
"""
self.eventList.append(funcName)
def nextStates(self, subState, nslList):
"""Sets up transitions from the state specified by substate
subState is one of the derived state classes, subclassed from the
context state machine class. nslList is a list of states to which
the context will transition upon the invocation of one of the
transition methods. 'None' may be specified instead of an actual
state if the context is to remain in the same state upon invocation
of the corresponding method.
"""
if len(nslList) != len(self.eventList):
j = "Expected %s Got %s."%(len(self.eventList), len(nslList))
raise RuntimeError("Wrong number of states in transition list.\n%s"%j)
subState.nextStates = dict(zip(self.eventList, nslList))
# -----------------------------------------------------------------------------
def event( state_table):
"""Decorator for indicating an Event or 'Action' method.
The decorator is applied to the methods of the state machine class to
indicate that the method will invoke a state dependant behavior. States
are implemented as subclasses of the context(state machine) class .
"""
stVarName = state_table.inst_state_name
def wrapper(func):
@wraps(func)
def objCall(self, *args, **kwargs):
state_var = getattr(self, stVarName)
rtn = state_var.getFunc(func, self)(self, *args, **kwargs)
return rtn
objCall.wrapping = stVarName
return objCall
return wrapper
def transition( state_table ):
"""Decorator used to set up methods which cause transitions between states.
The decorator is applied to methods of the context (state machine) class.
Invoking the method may cause a transition to another state. To define
what the transitions are, the nextStates method of the TransitionTable class
is used.
"""
stVarName = state_table.inst_state_name
def wrapper(func):
state_table._addEventHandler( func.__name__)
@wraps(func)
def objCall(self, *args, **kwargs):
state_var = getattr(self, stVarName)
state_var.setXition(func)
rtn = func(self, *args, **kwargs)
state_var.toNextState(self)
return rtn
objCall.wrapping =stVarName
return objCall
return wrapper
def transitionevent( state_table):
"""A decorator which is essentially the combination of the above two.
Can both invoke state dependent method and trigger a state
transition. Mostly equivalent to :
@Transition(xitionTable)
@Event(xitionTable)
"""
stVarName = state_table.inst_state_name
def wrapper(func):
state_table._addEventHandler( func.__name__)
@wraps(func)
def objCall(self, *args, **kwargs):
state_var = getattr(self, stVarName)
state_var.setXition(func)
rtn = state_var.getFunc(func, self)(self, *args, **kwargs)
state_var.toNextState(self)
return rtn
objCall.wrapping = stVarName
return objCall
return wrapper
Examples of Use
Simple Example
The example has three states, which rotate to the next state whenever the writeName method is called. In StateA, the text is printed out in lower case. In states StateB and StateC the text is printed out in upper case.
import DecoratorStateMachine as dsm
class StateContext( dsm.ContextBase):
ttable = dsm.TransitionTable('myState')
def __init__(self):
self.ttable.initialize(self)
@dsm.transitionevent(ttable)
def writeName(self, name):
pass
class StateA(StateContext):
def writeName(self, name):
print name.lower()
class StateB(StateContext):
def writeName(self, name):
print name.upper()
class StateC(StateB):
pass
# Set up transition table to cause states totoggle
StateContext.ttable.nextStates(StateA, (StateB,))
StateContext.ttable.nextStates(StateB, (StateC,))
StateContext.ttable.nextStates(StateC, (StateA,))
StateContext.ttable.initialstate = StateA
if __name__=='__main__':
days=("Monday","Tuesday","Wednesday","Thursday",
"Friday","Saturday","Sunday")
ctxt = StateContext()
for day in days:
ctxt.writeName(day)
x = raw_input("done>")
Output
monday TUESDAY WEDNESDAY thursday FRIDAY SATURDAY sunday
Miss Grant's Controller
The specification for this controller comes from Martin Fowler. This example uses wxPython as well as the state machine module.
import wx
import DecoratorStateMachine as dsm
class MyFrame(wx.Frame, dsm.ContextBase):
xtable = dsm.TransitionTable('pstate')
dtable = dsm.TransitionTable('dstate')
def __init__(self):
self.xtable.initialize(self)
self.dtable.initialize(self)
wx.Frame.__init__(self, None, -1, "My Frame", size=(410,250))
family = wx.SWISS
style = wx.NORMAL
weight = wx.BOLD
font = wx.Font(12,family,style,weight, False, "Verdana")
self.SetFont(font)
panel = wx.Panel(self, -1)
self.btnDoor = self.makeButton(panel, "Door", (50,20), self.onToggleDoor)
self.btnLight = self.makeButton(panel, "Light", (180,20), self.onLightOn )
self.btnDrawer = self.makeButton(panel, "Drawer", (50,60), self.onOpenDrawer)
self.btnPanel = self.makeButton(panel, "Panel", (180,60), self.onClosePanel)
self.btnPanel.Disable()
self.textArea = wx.StaticText(panel, -1, "Locked", pos=(50,100), size=(100,35))
# onEnter called here would invoke MyFrame.onEnter (below)
# call the current state's onEnter method indirectly through onInit()
self.onInit()
def onEnter(self):
print "Shouldn't get here. Should call some state's onEnter functions instead."
@dsm.transitionevent(dtable)
def onToggleDoor(self, event): pass
@dsm.event(dtable)
def onInit(self):
self.onEnter() # calls onEnter for current dtable/dstate state
@dsm.transition(xtable)
def onOpenDoor(self): pass
@dsm.transition(xtable)
def onCloseDoor(self): pass
@dsm.transition(xtable)
def onLightOn(self, event): pass
@dsm.transition(xtable)
def onOpenDrawer(self, event): pass
@dsm.transition(xtable)
def onClosePanel(self, event): pass
def makeButton( self, panel, label, positn, handler ):
button = wx.Button(panel, -1, label, pos=positn, size=(120,35))
self.Bind(wx.EVT_BUTTON, handler, button)
return button
class DoorOpen(MyFrame):
doorLabel = "Close Door"
def onEnter(self):
print self.dstate.name()
self.btnDoor.SetLabel( self.doorLabel )
def onToggleDoor(self, event):
self.onCloseDoor()
class DoorClosed(DoorOpen):
doorLabel = "Open Door"
def onToggleDoor(self, event):
self.onOpenDoor()
MyFrame.dtable.nextStates(DoorOpen, (DoorClosed,))
MyFrame.dtable.nextStates(DoorClosed, (DoorOpen,))
MyFrame.dtable.initialstate = DoorOpen
class Idle(MyFrame):
"""this is an initial state"""
def onEnter(self):
print self.pstate.name()
class Unlocked(Idle):
def onEnter(self):
print self.pstate.name()
self.btnPanel.Enable()
self.btnDoor.Disable()
self.textArea.SetLabel("Unlocked")
def onLeave(self):
self.textArea.SetLabel("Locked")
self.btnPanel.Disable()
self.btnDoor.Enable()
class Active(Idle):
pass
class LightOn(Idle):
pass
class DrawerOpen(Idle):
pass
MyFrame.xtable.nextStates(Idle, (Idle, Active, Idle, Idle, Idle))
MyFrame.xtable.nextStates(Active, (Idle, Active, LightOn, DrawerOpen, None))
MyFrame.xtable.nextStates(LightOn, (Idle, None, None, Unlocked, None ))
MyFrame.xtable.nextStates(DrawerOpen, (Idle, None, Unlocked, None, None))
MyFrame.xtable.nextStates(Unlocked, (Idle, None, None, None, Idle))
MyFrame.xtable.initialstate = Idle
if __name__=='__main__':
app = wx.PySimpleApp()
frame = MyFrame()
frame.Show(True)
app.MainLoop()
Explanation
There are actually two states in the context. One is for the state of the door, opened or closed. The other is for the main controller. The DoorOpen and DoorClosed states simply translate the onToggleDoor event to invoke either onCloseDoor or onOpenDoor.
The @transition decorator indicates the method can cause a state transition. The method body will be invoked if one is provided. The parameter on the decorator is the state table that governs the transition. When leaving a state, the state's onLeave method is called, if one is defined. When entering a state the state's onEnter method is called.
The @event decorator indicates the method is state dependent. The parameter on the decorator is used to determine which state variable (via the transition table) in the context (there may be multiple) contains the state whose method is to be invoked.
The @transitionevent decorator is a combination of the above two. A state dependent method is invoked, and it may cause a transition to a new state. The transition happens after the event method is invoked.
Since states are subclasses of the context, or subclasses of other states, rules governing method or attribute resolution apply. For instance DoorClosed is a subclass of DoorOpened, so when 'onEnter' of the DoorClosed state is called, it uses the one for DoorOpened. Since DoorClosed has defined a different value for doorLabel, the correct label is set on the door button.
Alternative Miss Grant's Controller Example
import wx
import DecoratorStateMachine as dsm
class MyFrame(wx.Frame, dsm.ContextBase):
dtable = dsm.TransitionTable('dstate')
def __init__(self):
self.dtable.initialize(self)
wx.Frame.__init__(self, None, -1, "My Frame", size=(410,250))
font = wx.Font(11, wx.SWISS, wx.NORMAL, wx.BOLD, False, "Verdana")
self.SetFont(font)
panel = wx.Panel(self, -1)
self.btnDoor = self.makeButton(panel, "Door", (50,20), self.onToggleDoor)
self.btnLight = self.makeButton(panel, "Light", (180,20), self.onLightOn )
self.btnDrawer = self.makeButton(panel, "Drawer", (50,60), self.onOpenDrawer)
self.btnPanel = self.makeButton(panel, "Panel", (180,60), self.onClosePanel)
self.btnPanel.Disable()
self.textArea = wx.StaticText(panel, -1, "Locked", pos=(50,100), size=(100,35))
# onEnter called here would invoke MyFrame.onEnter (below)
# call the current state's onEnter method indirectly through onInit()
self.onInit()
def onEnter(self):
print "Shouldn't get here. Should call some state's onEnter function instead."
@dsm.transition(dtable)
def onToggleDoor(self, event): pass
@dsm.event(dtable)
def onInit(self):
self.onEnter() # calls onEnter for current dtable/dstate state
@dsm.event(dtable)
def onLightOn(self, event): pass
@dsm.event(dtable)
def onOpenDrawer(self, event): pass
@dsm.event(dtable)
def onClosePanel(self, event): pass
def makeButton( self, panel, label, positn, handler ):
button = wx.Button(panel, -1, label, pos=positn, size=(120,35))
self.Bind(wx.EVT_BUTTON, handler, button)
return button
class DoorOpen(MyFrame):
doorLabel = "Close Door"
def onEnter(self):
print self.dstate.name()
self.btnDoor.SetLabel( self.doorLabel )
class DoorClosed(DoorOpen):
doorLabel = "Open Door"
xtable = dsm.TransitionTable('pstate')
def onEnter(self):
# Check self's class and return if it's not DoorClosed.
# otherwise if one of the xtable substates hasn't defined 'onEnter', we
# could go into infinite recursion.
if self.__class__.__name__ != "MyFrame_DoorClosed":
return
DoorOpen.onEnter(self)
self.xtable.initialize(self)
self.doEnter()
@dsm.event(xtable)
def doEnter(self):
self.onEnter()
@dsm.transition(xtable)
def onLightOn(self, event): pass
@dsm.transition(xtable)
def onOpenDrawer(self, event): pass
@dsm.transition(xtable)
def onClosePanel(self, event): pass
MyFrame.dtable.nextStates(DoorOpen, (DoorClosed,))
MyFrame.dtable.nextStates(DoorClosed, (DoorOpen,))
MyFrame.dtable.initialstate = DoorOpen
class Active(DoorClosed):
def onEnter(self):
print self.pstate.name()
class LightOn(Active):
pass
class DrawerOpen(Active):
pass
class Idle(Active):
pass
class Unlocked(Active):
def onEnter(self):
print self.pstate.name()
self.btnPanel.Enable()
self.btnDoor.Disable()
self.textArea.SetLabel("Unlocked")
def onLeave(self):
self.textArea.SetLabel("Locked")
self.btnPanel.Disable()
self.btnDoor.Enable()
DoorClosed.xtable.nextStates(Active, (LightOn, DrawerOpen, None))
DoorClosed.xtable.nextStates(LightOn, (None, Unlocked, None ))
DoorClosed.xtable.nextStates(DrawerOpen, (Unlocked, None, None))
DoorClosed.xtable.nextStates(Unlocked, (None, None, Idle))
DoorClosed.xtable.nextStates(Idle, (None, None, None))
DoorClosed.xtable.initialstate = Active
if __name__=='__main__':
app = wx.PySimpleApp()
frame = MyFrame()
frame.Show(True)
app.MainLoop()
Explanation
There are also two state tables in this example. The difference being that the DoorClosed state acts as the context for the second set of states. The onEnter method of the DoorClosed state re-initializes the second state to Active. In the DoorClosed state, the onLightOn, onOpenDrawer, and onClosePanel can cause transitions on the second state varible. These methods are events on the first state variable(dstate), and when in the DoorOpen state, the events do not get invoked for the xtable related state.
